package com.lzwk.app.dwd;

/**
 * @Author: CC
 * @Date: 2022/1/5 17:57
 * ods-->dwd
 * 维度表写入hbase,事实表写入kafka
 */

import com.lzwk.utils.MyKafkaUtil;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class BaseDBApp {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //1.1 设置CK&状态后端
        env.setStateBackend(new EmbeddedRocksDBStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://10.2.16.4:4007/flink_113/checkpoints/");
        env.enableCheckpointing(5000L);
        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(10000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);

        String sourceTopic = "ods_base_db";
        String groupId = "base_db_app_210325";
        String brokers = "127.0.0.1:9002";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId, brokers));
    }


}
